package org.xiaoxin.onecoupon.merchant.admin.mq.producer;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.xiaoxin.onecoupon.merchant.admin.mq.base.BaseSendExtendDTO;
import org.springframework.messaging.Message;

/**
 * RocketMQ 抽象公共发送消息组件
 * @author xiaoxin
 * 2025/4/20
 */
@RequiredArgsConstructor
@Slf4j(topic = "CommonSendProduceTemplate")
public abstract  class AbstractCommonSendProduceTemplate<T> {
    private final RocketMQTemplate rocketMQTemplate;
    /**
     * 构建消息发送事件基础扩充属性实体
     *
     * @param messageSendEvent 消息发送事件
     * @return 扩充属性实体
     */
    protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);
    protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);
    public SendResult sendMessage(T messageSendEvent){
        BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);
        SendResult sendResult;
        try {
            //构建 Topic 目标落点 formats: `topicName:tags`
            StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());
            if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())){
                destinationBuilder.append(":").append(baseSendExtendDTO.getTag());
            }
            //延迟时间不为空，发送任意延迟消息，否则发送普通消息
            if (baseSendExtendDTO.getDelayTime()!=null){
               sendResult=rocketMQTemplate.syncSendDeliverTimeMills(
                        destinationBuilder.toString(),
                        buildMessage(messageSendEvent,baseSendExtendDTO),
                        baseSendExtendDTO.getDelayTime()
                );
            }else{
                sendResult=rocketMQTemplate.syncSend(
                        destinationBuilder.toString(),
                        buildMessage(messageSendEvent,baseSendExtendDTO),
                        baseSendExtendDTO.getSentTimeout()
                );
            }
            log.info("[生产者] {} - 发送结果：{}，消息ID：{}，消息Keys：{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());
        }catch (Throwable ex){
            log.error("[生产者] {} - 消息发送失败，消息体：{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return  sendResult;
    }
}
